Lambda 関数を利用して、Linux OS で起動している暗号化されていない "ルートボリューム" を暗号化されたルートボリュームに自動で置き換えてみた
はじめに
テクニカルサポートの 片方 です。
Lambda 関数を利用して暗号化されていないルートボリュームを、暗号化されたルートボリュームに自動で置き換えてみました。
Windows OS は対象外で、Linux OS のみを対象としてます。
実装してみた
以下の順番で実装します。
- 実行ロール作成
- Lambda 関数作成
元になる暗号化されていないルートボリュームの設定値やタイプはそのままに、暗号化して置き換えます。
実行ロール
※ 信頼関係
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
アタッチするポリシー例
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:StopInstances",
"ec2:StartInstances",
"ec2:DescribeInstances",
"ec2:DescribeSnapshots",
"ec2:DescribeVolumes",
"ec2:CreateSnapshot",
"ec2:CreateVolume",
"ec2:AttachVolume",
"ec2:DetachVolume",
"ec2:DeleteVolume"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "logs:CreateLogGroup",
"Resource": "arn:aws:logs:<region>:<account-id>:*"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:<region>:<account-id>:log-group:/aws/lambda/<function-name>:*"
]
}
]
}
※ 適宜修正してください。
Lambda 関数
Python 3.12 で作成しました。
実行ロールでは、既存のロールを使用するを選択し、先ほど作成したロールを指定します。
実装する Lambda 関数例
import boto3
import time
ec2_client = boto3.client('ec2')
def lambda_handler(event, context):
# 暗号化されていないルートボリュームを検索
unencrypted_volumes = find_unencrypted_root_volumes()
if not unencrypted_volumes:
print("No unencrypted root volumes found.")
return
for volume in unencrypted_volumes:
volume_id = volume['VolumeId']
print(f"Unencrypted root volume {volume_id} found.")
# アタッチされているインスタンスIDを取得
instance_id = get_attached_instance(volume)
if not instance_id:
print(f"Volume {volume_id} is not attached to any instance.")
continue
# インスタンスのOSがLinuxであることを確認
if not is_linux_instance(instance_id):
print(f"Instance {instance_id} is not Linux. Skipping.")
continue
print(f"Volume {volume_id} is attached to Linux instance {instance_id}. Proceeding with encryption...")
# インスタンスを停止
stop_instance(instance_id)
# AvailabilityZoneとボリュームタイプを取得
availability_zone = volume['AvailabilityZone']
volume_type = volume['VolumeType']
iops = volume.get('Iops') # IOPSを取得 (io1, io2の場合)
# スナップショット作成
snapshot_id = create_snapshot(volume_id)
# 暗号化された新しいボリュームを作成
encrypted_volume_id = create_encrypted_volume(snapshot_id, availability_zone, volume_type, iops)
# 既存のルートボリュームをデタッチ
detach_volume(instance_id, volume_id)
# 新しい暗号化されたボリュームをルートボリュームとしてアタッチ
attach_volume(instance_id, encrypted_volume_id, volume['Attachments'][0]['Device'])
# 古い未暗号化ルートボリュームを削除
delete_volume(volume_id)
# インスタンスを再起動
start_instance(instance_id)
def find_unencrypted_root_volumes():
# 暗号化されていないルートボリュームを取得
response = ec2_client.describe_volumes(Filters=[{'Name': 'encrypted', 'Values': ['false']}])
unencrypted_volumes = []
for volume in response['Volumes']:
instance_id = get_attached_instance(volume)
if instance_id and is_root_volume(volume, instance_id):
unencrypted_volumes.append(volume)
return unencrypted_volumes
def get_attached_instance(volume):
# アタッチされているインスタンスIDを取得
if 'Attachments' in volume and volume['Attachments']:
return volume['Attachments'][0]['InstanceId']
return None
def is_root_volume(volume, instance_id):
# ルートデバイスかどうかを確認
instance_info = ec2_client.describe_instances(InstanceIds=[instance_id])
root_device_name = instance_info['Reservations'][0]['Instances'][0]['RootDeviceName']
return volume['Attachments'][0]['Device'] == root_device_name
def is_linux_instance(instance_id):
# インスタンスがLinuxかどうかを確認
instance_info = ec2_client.describe_instances(InstanceIds=[instance_id])
platform_details = instance_info['Reservations'][0]['Instances'][0].get('PlatformDetails', '')
# Linuxであることを確認
return 'Linux' in platform_details
def create_snapshot(volume_id):
# スナップショット作成
response = ec2_client.create_snapshot(VolumeId=volume_id, Description=f"Snapshot of {volume_id}")
snapshot_id = response['SnapshotId']
print(f"Snapshot {snapshot_id} created for volume {volume_id}")
wait_for_snapshot(snapshot_id)
return snapshot_id
def create_encrypted_volume(snapshot_id, availability_zone, volume_type, iops=None):
# 暗号化されたボリュームを作成
create_volume_params = {
'SnapshotId': snapshot_id,
'AvailabilityZone': availability_zone,
'VolumeType': volume_type,
'Encrypted': True
}
if volume_type in ['io1', 'io2'] and iops:
create_volume_params['Iops'] = iops
response = ec2_client.create_volume(**create_volume_params)
encrypted_volume_id = response['VolumeId']
print(f"Encrypted volume {encrypted_volume_id} created from snapshot {snapshot_id} with type {volume_type}")
wait_for_volume_state(encrypted_volume_id, 'available')
return encrypted_volume_id
def detach_volume(instance_id, volume_id):
# ボリュームをインスタンスからデタッチ
ec2_client.detach_volume(VolumeId=volume_id, InstanceId=instance_id, Force=True)
print(f"Detaching volume {volume_id} from instance {instance_id}...")
wait_for_volume_state(volume_id, 'available')
def attach_volume(instance_id, volume_id, device_name):
# 新しいボリュームをインスタンスにアタッチ
ec2_client.attach_volume(VolumeId=volume_id, InstanceId=instance_id, Device=device_name)
print(f"Attaching volume {volume_id} to instance {instance_id} as {device_name}...")
check_volume_attached(volume_id)
def check_volume_attached(volume_id):
# ボリュームがアタッチされるのを待つ
for attempt in range(10):
volume = ec2_client.describe_volumes(VolumeIds=[volume_id])['Volumes'][0]
if volume['State'] == 'in-use':
print(f"Volume {volume_id} is now in-use state.")
return
time.sleep(5)
raise Exception(f"Volume {volume_id} failed to reach 'in-use' state.")
def delete_volume(volume_id):
# 古いボリュームを削除
ec2_client.delete_volume(VolumeId=volume_id)
print(f"Deleted volume {volume_id}")
def wait_for_volume_state(volume_id, state):
# ボリュームのステータスを待機
waiter = ec2_client.get_waiter(f'volume_{state}')
try:
waiter.wait(
VolumeIds=[volume_id],
WaiterConfig={
'Delay': 15, # 待機時間を15秒に設定
'MaxAttempts': 40 # 最大試行回数を40回に設定(必要に応じて調整)
}
)
except Exception as e:
print(f"Error waiting for volume {volume_id} to be in {state} state: {str(e)}")
raise
print(f"Volume {volume_id} is now in {state} state.")
def wait_for_snapshot(snapshot_id):
# スナップショットの完了を待機
waiter = ec2_client.get_waiter('snapshot_completed')
waiter.wait(SnapshotIds=[snapshot_id])
print(f"Snapshot {snapshot_id} is now completed.")
def stop_instance(instance_id):
# インスタンスを停止
ec2_client.stop_instances(InstanceIds=[instance_id])
print(f"Stopping instance {instance_id}...")
waiter = ec2_client.get_waiter('instance_stopped')
waiter.wait(InstanceIds=[instance_id])
print(f"Instance {instance_id} is stopped.")
def start_instance(instance_id):
# インスタンスを開始
ec2_client.start_instances(InstanceIds=[instance_id])
print(f"Starting instance {instance_id}...")
waiter = ec2_client.get_waiter('instance_running')
waiter.wait(InstanceIds=[instance_id])
print(f"Instance {instance_id} is running.")
※ 適宜修正してください。
なお、暗号化に使用されるキーは、デフォルトで使用される aws/ebs この KMS キーを暗号化に使用します。
検証してみた
暗号化されていないルートボリュームを使用して Linux OS のインスタンスを起動します。
検証のため以下の OS と異なるボリュームタイプをアタッチさせて起動しました。
- Amazon Linux 2023 : io2
- Amazon Linux 2 : io1
- Red Hat Enterprise Linux 9.4 : gp3
- Ubuntu 24.04 : gp2
- CentOS Linux 7.9.2009 : standard
Lambda 関数をテストします。
暫くすると...全て成功しました。
EBS のマネジメントコンソール画面を確認します。暗号化されていました!
暗号化実施前のスナップショットも取得されています。
EC2 側でもステータスチェックが成功しており問題なく起動しています。
まとめ
本ブログが誰かの参考になれば幸いです。
参考資料
- EBS ボリュームで使用している暗号化キーを変更する | AWS re:Post
- Amazon EBS暗号化の使用 - Amazon EBS
- EC2 - Boto3 1.35.14 documentation
アノテーション株式会社について
アノテーション株式会社は、クラスメソッド社のグループ企業として「オペレーション・エクセレンス」を担える企業を目指してチャレンジを続けています。「らしく働く、らしく生きる」のスローガンを掲げ、様々な背景をもつ多様なメンバーが自由度の高い働き方を通してお客様へサービスを提供し続けてきました。現在当社では一緒に会社を盛り上げていただけるメンバーを募集中です。少しでもご興味あれば、アノテーション株式会社WEBサイトをご覧ください。